 1.Flink Window窗口机制
   
   Flink Window 背景
   Flink认为Batch是Streaming的一个特例，因此Flink底层引擎是一个流式引擎，在上面实现了流处
理和批处理。而Window就是从Streaming到Batch的桥梁。
   通俗讲，Window是用来对一个无限的流设置一个有限的集合，从而在有界的数据集上进行操作的
一种机制。流上的集合由Window来划定范围，比如“计算过去10分钟”或者“最后50个元素的和”。
   Window可以由时间（Time Window）（比如每30s）或者数据（Count Window）（如每100个
元素）驱动。DataStream API提供了Time和Count的Window。
   Flink Window 总览
   Window 是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”，我们可以在其上应用计
算。
   Flink 认为 Batch 是 Streaming 的一个特例，所以 Flink 底层引擎是一个流式引擎，在上面实现了
流处理和批处理。
   而窗口（window）就是从 Streaming 到 Batch 的一个桥梁。
   Flink 提供了非常完善的窗口机制。
   在流处理应用中，数据是连续不断的，因此我们不可能等到所有数据都到了才开始处理。
   当然我们可以每来一个消息就处理一次，但是有时我们需要做一些聚合类的处理，例如：在过去的
1 分钟内有多少用户点击了我们的网页。
   在这种情况下，我们必须定义一个窗口，用来收集最近一分钟内的数据，并对这个窗口内的数据进
行计算。
   窗口可以是基于时间驱动的（Time Window，例如：每30秒钟）
   也可以是基于数据驱动的（Count Window，例如：每一百个元素）
   同时基于不同事件驱动的窗口又可以分成以下几类：
       翻滚窗口 (Tumbling Window, 无重叠)
       滑动窗口 (Sliding Window, 有重叠)
       会话窗口 (Session Window, 活动间隙)
       全局窗口 (略)
   Flink要操作窗口，先得将StreamSource 转成WindowedStream
   步骤：
   1、 获取流数据源
   2、 获取窗口
   3、 操作窗口数据
   4、 输出窗口数据
 
 2.时间窗口(TimeWindow)
   
   1).滚动时间窗口(Tumbling Window)
   将数据依据固定的窗口长度对数据进行切分
   特点：时间对齐，窗口长度固定，没有重叠
   代码示例
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

public class WindowDemo {
    //1、 获取流数据源
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
            int count = 0;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(count + "号数据源");
                    count++;
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //2、获取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        KeyedStream<Tuple3<String, String, String>, String> keybyed = maped.keyBy(value -> value.f0);
        
        WindowedStream<Tuple3<String, String, String>, String, TimeWindow> timeWindow = keybyed.timeWindow(Time.seconds(5));

        //3、操作窗口数据
        SingleOutputStreamOperator<String> applyed = timeWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);
                }
                String s1 = s + "..." + window.getStart() + "..." + sb;
                out.collect(s1);
            }
        });
        //4、输出窗口数据
        applyed.print();
        env.execute();

    }
}
   
   
   (1).基于时间驱动
   场景：我们需要统计每一分钟中用户购买的商品的总数，需要将用户的行为事件按每一分钟进行切
分，这种切分被成为翻滚时间窗口（Tumbling Time Window）
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

public class WindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1、 获取流数据源
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        DataStreamSource<String> data1 = env.addSource(new SourceFunction<String>() {
            int count = 0;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(count + "号数据源");
                    count++;
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //2、获取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        KeyedStream<Tuple3<String, String, String>, String> keybyed = maped.keyBy(value -> value.f0);
        
        WindowedStream<Tuple3<String, String, String>, String, TimeWindow> timeWindow = keybyed.timeWindow(Time.seconds(5));

        //3、操作窗口数据
        SingleOutputStreamOperator<String> applyed = timeWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);
                }
                String s1 = s + "..." + window.getStart() + "..." + sb;
                out.collect(s1);
            }
        });
        //4、输出窗口数据
        applyed.print();
        env.execute();

    }
}

   (2).基于事件驱动
   场景：当我们想要每100个用户的购买行为作为驱动，那么每当窗口中填满100个”相同”元素了，就
会对窗口进行计算。
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

public class WindowDemoCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //2、获取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {

                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        KeyedStream<Tuple3<String, String, String>, String> keybyed = maped.keyBy(value -> value.f0);

        WindowedStream<Tuple3<String, String, String>, String, GlobalWindow> countWindow = keybyed.countWindow(3);

        SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {
            @Override
            public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);

                }
                out.collect(sb.toString());
            }
        });
        applyed.print();
        env.execute();
    }
}

   2).滑动时间窗口(Sliding Window)
   滑动窗口是固定窗口的更广义的一种形式，滑动窗口由固定的窗口长度和滑动间隔组成
   特点：窗口长度固定，可以有重叠
   (1).基于时间的滑动窗口
   场景: 我们可以每30秒计算一次最近一分钟用户购买的商品总数
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

public class WindowDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1、 获取流数据源
        DataStreamSource<String> data1 = env.socketTextStream("linux121", 7777);
        DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
            int count = 0;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(count + "号数据源");
                    count++;
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //2、获取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {

                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        KeyedStream<Tuple3<String, String, String>, String> keybyed = maped.keyBy(value -> value.f0);
        
        WindowedStream<Tuple3<String, String, String>, String, TimeWindow> timeWindow = keybyed.timeWindow(Time.seconds(5), Time.seconds(2));

        //3、操作窗口数据
        SingleOutputStreamOperator<String> applyed = timeWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);
                }
                String s1 = s + "..." + sdf.format(window.getStart()) + "..." + sdf.format(window.getEnd()) + "..." + sb;
                out.collect(s1);
            }
        });
        //4、输出窗口数据
        applyed.print();
        env.execute();

    }
}

   (2).基于事件的滑动窗口
   场景: 每10个 “相同”元素计算一次最近100个元素的总和
   代码实现
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

public class WindowDemoCount1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        //2、获取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        KeyedStream<Tuple3<String, String, String>, String> keybyed = maped.keyBy(value -> value.f0);

        WindowedStream<Tuple3<String, String, String>, String, GlobalWindow> countWindow = keybyed.countWindow(3, 1);

        SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {
            @Override
            public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);

                }
                out.collect(sb.toString());
            }
        });
        applyed.print();
        env.execute();
    }
}

   3).会话窗口（Session Window）
   由一系列事件组合一个指定时间长度的timeout间隙组成，类似于web应用的session，也就是一段
时间没有接收到新数据就会生成新的窗口。
   session窗口分配器通过session活动来对元素进行分组，session窗口跟滚动窗口和滑动窗口相比，不
会有重叠和固定的开始时间和结束时间的情况
   session窗口在一个固定的时间周期内不再收到元素，即非活动间隔产生，那么这个窗口就会关闭。
   一个session窗口通过一个session间隔来配置，这个session间隔定义了非活跃周期的长度，当这个非
活跃周期产生，那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。
   
   特点
   会话窗口不重叠，没有固定的开始和结束时间
   与翻滚窗口和滑动窗口相反, 当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。
   后续的元素将会被分配给新的会话窗口
   案例描述
   计算每个用户在活跃期间总共购买的商品数量，如果用户30秒没有活动则视为会话断开
   代码实现
package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowDemoSession {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<String> mapped = data.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });

        KeyedStream<String, String> keybyed = mapped.keyBy(value -> value);
        WindowedStream<String, String, TimeWindow> sessionWindow = keybyed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        SingleOutputStreamOperator<String> applyed = sessionWindow.apply(new WindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (String str : input) {
                    sb.append(str);
                }
                out.collect(sb.toString());
            }
        });
        applyed.print();
        env.execute();
    }
}